{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Data Wrangler Export to SageMaker Pipelines Notebook\n",
    "\n",
    "You can use Amazon SageMaker Pipelines to create\n",
    "end-to-end workflows that manage and deploy SageMaker jobs. Pipelines\n",
    "come with SageMaker Python SDK integration, so you can build each step\n",
    "of your workflow using a Python-based interface.\n",
    "\n",
    "After your workflow is deployed, you can view the Directed Acyclic Graph\n",
    "(DAG) for your pipeline and manage your executions using Amazon SageMaker Studio.\n",
    "\n",
    "Use this notebook to create a SageMaker pipeline with a data preperation step,\n",
    "defined by your Data Wrangler flow.\n",
    "\n",
    "In this notebook, you will do the following:\n",
    "* Upload your Data Wrangler .flow file to S3 so that it can be used to define\n",
    "a processing job step.\n",
    "* Define a processing job step. This step is used to create a pipeline.\n",
    "* Define a pipeline that includes a data preperation steps defined by your\n",
    "Data Wrangler flow. Optionally, you can add additional steps to your pipeline.\n",
    "* Execute the pipeline and monitor its status using SageMaker Pipeline APIs."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Installing dependencies..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install sagemaker==2.23.1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# SageMaker Python SDK version 2.x is required\n",
    "import sagemaker\n",
    "import subprocess\n",
    "import sys\n",
    "\n",
    "original_version = sagemaker.__version__\n",
    "if sagemaker.__version__ != \"2.23.1\":\n",
    "    subprocess.check_call(\n",
    "        [sys.executable, \"-m\", \"pip\", \"install\", \"sagemaker==2.23.1\"]\n",
    "    )\n",
    "    import importlib\n",
    "    importlib.reload(sagemaker)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "import os\n",
    "import time\n",
    "import uuid\n",
    "\n",
    "import boto3\n",
    "import sagemaker"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Parameters\n",
    "The following lists parameters that are used throughout this notebook.\n",
    "You can, optionally, use the following cell to configure these variables:\n",
    "* `bucket` - The S3 bucket used to save the output returned\n",
    "from the processing job and the flow file you exported from Data Wrangler.\n",
    "* `prefix` - This is the prefix your .flow file is saved under in S3.\n",
    "* `flow_id` and `flow_name` - used to name your flow file when it is saved\n",
    "to S3.\n",
    "* `instance_type` - The instance type used in your processing job.\n",
    "* `output_content_type` - The format type used to save the output of the\n",
    "processing job.\n",
    "* `sagemaker_endpoint_url` - An endpoint URL used to communicate with SageMaker."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "# The S3 bucket and location used to save processing job outputs and your .flow file.\n",
    "# Specify a different bucket here if you wish.\n",
    "sess = sagemaker.Session()\n",
    "bucket = sess.default_bucket()\n",
    "prefix = \"data_wrangler_flows\"\n",
    "flow_id = f\"{time.strftime('%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}\"\n",
    "flow_name = f\"flow-{flow_id}\"\n",
    "flow_uri = f\"s3://{bucket}/{prefix}/{flow_name}.flow\"\n",
    "\n",
    "# Do not modify flow_file_name\n",
    "flow_file_name = \"/home/sagemaker-user/workshop/06_prepare/wip/99_transform_star_rating_to_sentiment.flow\"\n",
    "\n",
    "iam_role = sagemaker.get_execution_role()\n",
    "\n",
    "container_uri = \"663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.0.2\"\n",
    "\n",
    "# Processing Job Resources Configurations\n",
    "# Data wrangler processing job only supports 1 instance.\n",
    "instance_count = 1\n",
    "instance_type = \"ml.m5.xlarge\"\n",
    "\n",
    "# Processing Job Path URI Information\n",
    "output_prefix = f\"export-{flow_name}/output\"\n",
    "output_path = f\"s3://{bucket}/{output_prefix}\"\n",
    "output_name = \"2b17c16a-8c82-432a-9fec-11ed0b510539.default\"\n",
    "\n",
    "processing_dir = \"/opt/ml/processing\"\n",
    "\n",
    "# Modify the variable below to specify the content type to be used for writing each output\n",
    "# Currently supported options are 'CSV' or 'PARQUET', and the default is 'CSV'\n",
    "output_content_type = \"CSV\"\n",
    "\n",
    "# URL to use for sagemaker client.\n",
    "# If this is None, boto will automatically construct the appropriate URL to use\n",
    "# when communicating with sagemaker.\n",
    "sagemaker_endpoint_url = None"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Upload the Data Wrangler .flow file to Amazon S3 so that it can be used as an input to the\n",
    "processing job."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "ename": "FileNotFoundError",
     "evalue": "[Errno 2] No such file or directory: '/home/sagemaker-user/workshop/06_prepare/wip/99_transform_star_rating_to_sentiment.flow'",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[0;31mFileNotFoundError\u001b[0m                         Traceback (most recent call last)",
      "\u001b[0;32m<ipython-input-14-89c487417ed3>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m      2\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m      3\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 4\u001b[0;31m \u001b[0;32mwith\u001b[0m \u001b[0mopen\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mflow_file_name\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m      5\u001b[0m     \u001b[0mflow\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mjson\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mload\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mf\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m      6\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;31mFileNotFoundError\u001b[0m: [Errno 2] No such file or directory: '/home/sagemaker-user/workshop/06_prepare/wip/99_transform_star_rating_to_sentiment.flow'"
     ]
    }
   ],
   "source": [
    "# Load .flow file\n",
    "\n",
    "\n",
    "with open(flow_file_name) as f:\n",
    "    flow = json.load(f)\n",
    "\n",
    "# Upload to S3\n",
    "s3_client = boto3.client(\"s3\")\n",
    "s3_client.upload_file('/home/sagemaker-user/workshop/06_prepare/wip/99_transform_star_rating_to_sentiment.flow', bucket, f\"{prefix}/{flow_name}.flow\")\n",
    "\n",
    "print(f\"Data Wrangler Flow uploaded to {flow_uri}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "usage: aws s3 cp <LocalPath> <S3Uri> or <S3Uri> <LocalPath> or <S3Uri> <S3Uri>\n",
      "Error: Invalid argument type\n"
     ]
    }
   ],
   "source": [
    "!aws s3 cp \"99_transform_star_rating_to_sentiment.flow\" \"$bucket/$prefix/$flow_name.flow\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Boto3 Processing Job arguments\n",
    "\n",
    "This notebook submits a processing job using boto, which will require an argument dictionary to\n",
    "submit to the boto client. Below, utility methods are defined for creating processing job inputs\n",
    "for the following sources: S3, Athena, and Redshift. Then, the argument dictionary is generated\n",
    "using the parsed inputs and job configurations such as instance type."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_flow_notebook_processing_input(base_dir, flow_s3_uri):\n",
    "    return {\n",
    "        \"InputName\": \"flow\",\n",
    "        \"S3Input\": {\n",
    "            \"LocalPath\": f\"{base_dir}/flow\",\n",
    "            \"S3Uri\": flow_s3_uri,\n",
    "            \"S3DataType\": \"S3Prefix\",\n",
    "            \"S3InputMode\": \"File\",\n",
    "        },\n",
    "    }\n",
    "\n",
    "def create_s3_processing_input(base_dir, name, dataset_definition):\n",
    "    return {\n",
    "        \"InputName\": name,\n",
    "        \"S3Input\": {\n",
    "            \"LocalPath\": f\"{base_dir}/{name}\",\n",
    "            \"S3Uri\": dataset_definition[\"s3ExecutionContext\"][\"s3Uri\"],\n",
    "            \"S3DataType\": \"S3Prefix\",\n",
    "            \"S3InputMode\": \"File\",\n",
    "        },\n",
    "    }\n",
    "\n",
    "def create_redshift_processing_input(base_dir, name, dataset_definition):\n",
    "    return {\n",
    "        \"InputName\": name,\n",
    "        \"DatasetDefinition\": {\n",
    "            \"RedshiftDatasetDefinition\": {\n",
    "                \"ClusterId\": dataset_definition[\"clusterIdentifier\"],\n",
    "                \"Database\": dataset_definition[\"database\"],\n",
    "                \"DbUser\": dataset_definition[\"dbUser\"],\n",
    "                \"QueryString\": dataset_definition[\"queryString\"],\n",
    "                \"ClusterRoleArn\": dataset_definition[\"unloadIamRole\"],\n",
    "                \"OutputS3Uri\": f'{dataset_definition[\"s3OutputLocation\"]}{name}/',\n",
    "                \"OutputFormat\": dataset_definition[\"outputFormat\"].upper(),\n",
    "            },\n",
    "            \"LocalPath\": f\"{base_dir}/{name}\",\n",
    "        },\n",
    "    }\n",
    "\n",
    "def create_athena_processing_input(base_dir, name, dataset_definition):\n",
    "    return {\n",
    "        \"InputName\": name,\n",
    "        \"DatasetDefinition\": {\n",
    "            \"AthenaDatasetDefinition\": {\n",
    "                \"Catalog\": dataset_definition[\"catalogName\"],\n",
    "                \"Database\": dataset_definition[\"databaseName\"],\n",
    "                \"QueryString\": dataset_definition[\"queryString\"],\n",
    "                \"OutputS3Uri\": f'{dataset_definition[\"s3OutputLocation\"]}{name}/',\n",
    "                \"OutputFormat\": dataset_definition[\"outputFormat\"].upper(),\n",
    "            },\n",
    "            \"LocalPath\": f\"{base_dir}/{name}\",\n",
    "        },\n",
    "    }\n",
    "\n",
    "def create_processing_inputs(processing_dir, flow, flow_uri):\n",
    "    \"\"\"Helper function for creating processing inputs\n",
    "    :param flow: loaded data wrangler flow notebook\n",
    "    :param flow_uri: S3 URI of the data wrangler flow notebook\n",
    "    \"\"\"\n",
    "    processing_inputs = []\n",
    "    flow_processing_input = create_flow_notebook_processing_input(processing_dir, flow_uri)\n",
    "    processing_inputs.append(flow_processing_input)\n",
    "\n",
    "    for node in flow[\"nodes\"]:\n",
    "        if \"dataset_definition\" in node[\"parameters\"]:\n",
    "            data_def = node[\"parameters\"][\"dataset_definition\"]\n",
    "            name = data_def[\"name\"]\n",
    "            source_type = data_def[\"datasetSourceType\"]\n",
    "\n",
    "            if source_type == \"S3\":\n",
    "                s3_processing_input = create_s3_processing_input(\n",
    "                    processing_dir, name, data_def)\n",
    "                processing_inputs.append(s3_processing_input)\n",
    "            elif source_type == \"Athena\":\n",
    "                athena_processing_input = create_athena_processing_input(\n",
    "                    processing_dir, name, data_def)\n",
    "                processing_inputs.append(athena_processing_input)\n",
    "            elif source_type == \"Redshift\":\n",
    "                redshift_processing_input = create_redshift_processing_input(\n",
    "                    processing_dir, name, data_def)\n",
    "                processing_inputs.append(redshift_processing_input)\n",
    "            else:\n",
    "                raise ValueError(f\"{source_type} is not supported for Data Wrangler Processing.\")\n",
    "    return processing_inputs\n",
    "\n",
    "def create_container_arguments(output_name, output_content_type):\n",
    "    output_config = {\n",
    "        output_name: {\n",
    "            \"content_type\": output_content_type\n",
    "        }\n",
    "    }\n",
    "    return [f\"--output-config '{json.dumps(output_config)}'\"]\n",
    "\n",
    "# Create Processing Job Arguments\n",
    "processing_job_arguments = {\n",
    "    \"AppSpecification\": {\n",
    "        \"ContainerArguments\": create_container_arguments(output_name, output_content_type),\n",
    "        \"ImageUri\": container_uri,\n",
    "    },\n",
    "    \"ProcessingInputs\": create_processing_inputs(processing_dir, flow, flow_uri),\n",
    "    \"ProcessingOutputConfig\": {\n",
    "        \"Outputs\": [\n",
    "            {\n",
    "                \"OutputName\": output_name,\n",
    "                \"S3Output\": {\n",
    "                    \"S3Uri\": output_path,\n",
    "                    \"LocalPath\": os.path.join(processing_dir, \"output\"),\n",
    "                    \"S3UploadMode\": \"EndOfJob\",\n",
    "                }\n",
    "            },\n",
    "        ],\n",
    "    },\n",
    "    \"ProcessingResources\": {\n",
    "        \"ClusterConfig\": {\n",
    "            \"InstanceCount\": instance_count,\n",
    "            \"InstanceType\": instance_type,\n",
    "            \"VolumeSizeInGB\": 30,\n",
    "        }\n",
    "    },\n",
    "    \"RoleArn\": iam_role,\n",
    "    \"StoppingCondition\": {\n",
    "        \"MaxRuntimeInSeconds\": 86400,\n",
    "    },\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The following cell creates a processing step using your exported Data Wrangler flow.\n",
    "This step will be used to create a SageMaker Pipeline."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sagemaker.workflow.steps import ProcessingStep, Step, StepTypeEnum\n",
    "\n",
    "class NaiveStep(Step):\n",
    "\n",
    "    def __init__(self, name, step_type: StepTypeEnum, step_args):\n",
    "        self.name = name\n",
    "        self.step_type = step_type\n",
    "        self.step_args = step_args\n",
    "\n",
    "    def arguments(self):\n",
    "        raise NotImplementedError()\n",
    "\n",
    "    def properties(self):\n",
    "        raise NotImplementedError()\n",
    "\n",
    "    def to_request(self):\n",
    "        return {\n",
    "            'Name': self.name,\n",
    "            'Type': self.step_type.value,\n",
    "            'Arguments': self.step_args\n",
    "        }\n",
    "\n",
    "\n",
    "step_process = NaiveStep(\n",
    "    name=\"DataWranglerProcessingStep\",\n",
    "    step_type=StepTypeEnum.PROCESSING,\n",
    "    step_args=processing_job_arguments\n",
    ")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Workflow Creation\n",
    "The following cell defines a new pipeline with the processing step.\n",
    "Use this cell to add additional steps to the pipeline. To learn more about adding\n",
    "steps to a pipeline, see\n",
    "[Define a Pipeline](http://docs.aws.amazon.com/sagemaker/latest/dg/define-pipeline.html)\n",
    "in the SageMaker documentation.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "\n",
    "from sagemaker.workflow.parameters import (\n",
    "    ParameterInteger,\n",
    "    ParameterString,\n",
    ")\n",
    "from sagemaker.workflow.pipeline import Pipeline\n",
    "\n",
    "\n",
    "pipeline_name = f\"datawrangler-pipeline-{int(time.time() * 10**7)}\"\n",
    "instance_type = ParameterString(name=\"InstanceType\", default_value=\"ml.m5.4xlarge\")\n",
    "instance_count = ParameterInteger(name=\"InstanceCount\", default_value=1)\n",
    "\n",
    "boto_session = boto3.session.Session()\n",
    "region = boto_session.region_name\n",
    "\n",
    "sagemaker_client = boto_session.client(\"sagemaker\")\n",
    "runtime_client = boto_session.client(\"sagemaker-runtime\")\n",
    "\n",
    "sagemaker_session = sagemaker.session.Session(\n",
    "    boto_session=boto_session,\n",
    "    sagemaker_client=sagemaker_client,\n",
    "    sagemaker_runtime_client=runtime_client,\n",
    ")\n",
    "\n",
    "pipeline = Pipeline(\n",
    "    name=pipeline_name,\n",
    "    parameters=[instance_type, instance_count],\n",
    "    steps=[step_process],\n",
    "    sagemaker_session=sagemaker_session\n",
    ")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Run the following to validate the pipeline definition."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "\n",
    "\n",
    "definition = json.loads(pipeline.definition())\n",
    "definition\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Run Pipeline\n",
    "Use the following cell to submit a pipeline creation job.\n",
    "You can check the progress of the pipeline with the pipeline Amazon Resource Name (ARN).\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from botocore.exceptions import ClientError, ValidationError\n",
    "\n",
    "\n",
    "try:\n",
    "    response = pipeline.create(role_arn=iam_role)\n",
    "except ClientError as e:\n",
    "    error = e.response[\"Error\"]\n",
    "    if error[\"Code\"] == \"ValidationError\" and \"Pipeline names must be unique\" in error[\"Message\"]:\n",
    "        print(error[\"Message\"])\n",
    "        response = pipeline.describe()\n",
    "    else:\n",
    "        raise\n",
    "\n",
    "pipeline_arn = response[\"PipelineArn\"]\n",
    "print(pipeline_arn)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Pipeline Operations: Examine and Wait for Pipeline Execution\n",
    "\n",
    "The higher-level resources of the pipeline instance provide a way for the Data Scientist and\n",
    "Machine Learning Engineer to define a workflow that can be executed by SageMaker.\n",
    "\n",
    "To monitor operations of this execution, we use the lower-level, raw workflow boto3 client of the\n",
    "pipeline to describe the pipeline execution and list the pipeline execution steps.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "start_response = pipeline.start()\n",
    "pipeline_execution_arn = start_response.arn\n",
    "print(pipeline_execution_arn)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Pipeline Status\n",
    "You can use the function [describe_pipeline_execution][1] to monitor a pipeline's execution.\n",
    "to view a pipeline's execution status.To view a pipeline's execution steps, you can use\n",
    "[list_pipeline_execution_steps][2].The following cell checks the pipeline status and execution\n",
    "steps using these functions.\n",
    "\n",
    "[1]: https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipelineExecution.html\n",
    "[2]: https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ListPipelineExecutionSteps.html\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pprint import pprint\n",
    "\n",
    "execution_response = sagemaker_session.sagemaker_client.describe_pipeline_execution(\n",
    "    PipelineExecutionArn=pipeline_execution_arn\n",
    ")\n",
    "print(\"Pipeline: {}.\".format(execution_response[\"PipelineExecutionStatus\"]))\n",
    "print()\n",
    "\n",
    "execution_steps_response = sagemaker_session.sagemaker_client.list_pipeline_execution_steps(\n",
    "    PipelineExecutionArn=pipeline_execution_arn\n",
    ")\n",
    "execution_steps = execution_steps_response[\"PipelineExecutionSteps\"]\n",
    "print(\"Execution steps:\")\n",
    "pprint(execution_steps)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Use the following cells to define and run a function that waits until the pipeline execution status\n",
    "changes to a terminal state: `Failed` or `Succeeded`.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import botocore.waiter\n",
    "\n",
    "\n",
    "def get_waiter(pipeline, delay=24, max_attempts=60):\n",
    "    waiter_id = \"PipelineExecutionComplete\"\n",
    "    model = botocore.waiter.WaiterModel({\n",
    "        \"version\": 2,\n",
    "        \"waiters\": {\n",
    "            waiter_id: {\n",
    "                \"delay\": delay,\n",
    "                \"maxAttempts\": max_attempts,\n",
    "                \"operation\": 'DescribePipelineExecution',\n",
    "                \"acceptors\": [\n",
    "                    {\n",
    "                        \"expected\": \"Succeeded\",\n",
    "                        \"matcher\": \"path\",\n",
    "                        \"state\": \"success\",\n",
    "                        \"argument\": \"PipelineExecutionStatus\"\n",
    "                    },\n",
    "                    {\n",
    "                        \"expected\": \"Failed\",\n",
    "                        \"matcher\": \"path\",\n",
    "                        \"state\": \"failure\",\n",
    "                        \"argument\": \"PipelineExecutionStatus\"\n",
    "                    },\n",
    "                ]\n",
    "            }\n",
    "        }\n",
    "    })\n",
    "    return botocore.waiter.create_waiter_with_client(\n",
    "        waiter_id, model, sagemaker_session.sagemaker_client\n",
    "    )\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "waiter = get_waiter(pipeline)\n",
    "waiter.wait(PipelineExecutionArn=pipeline_execution_arn)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "execution_steps_response = sagemaker_session.sagemaker_client.list_pipeline_execution_steps(\n",
    "    PipelineExecutionArn=pipeline_execution_arn\n",
    ")\n",
    "execution_steps = execution_steps_response[\"PipelineExecutionSteps\"]\n",
    "print(\"Execution steps:\")\n",
    "pprint(execution_steps)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Cleanup\n",
    "Uncomment the following code cell to revert the SageMaker Python SDK to the original version used\n",
    "before running this notebook. This notebook upgrades the SageMaker Python SDK to 2.x, which may\n",
    "cause other example notebooks to break. To learn more about the changes introduced in the\n",
    "SageMaker Python SDK 2.x update, see\n",
    "[Use Version 2.x of the SageMaker Python SDK.](https://sagemaker.readthedocs.io/en/stable/v2.html)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# _ = subprocess.check_call(\n",
    "#     [sys.executable, \"-m\", \"pip\", \"install\", f\"sagemaker=={original_version}\"]\n",
    "# )"
   ]
  }
 ],
 "metadata": {
  "instance_type": "ml.t3.medium",
  "kernelspec": {
   "display_name": "Python 3 (Data Science)",
   "language": "python",
   "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
